package com.mqtt.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;

/**
 * Created by Administrator on 2018/3/21.
 */
@Configuration
public class MqttConfig {
    private static Logger logger =  LoggerFactory.getLogger(MqttConfig.class);
    @Value("${iot.baidu.mqtt.url}")
    private String mqttUrl;//连接路径
    @Value("${iot.baidu.mqtt.principal}")
    private String mqttpPrincipal;//身份
    @Value("${iot.baidu.mqtt.password}")
    private String mqttpPassword;//身份密钥
    @Value("${iot.baidu.mqtt.topic}")
    private String mqttpTopic;//mqtt默认订阅主题
    @Value("${iot.baidu.mqtt.clientId}")
    private String mqttpClientId;//mqtt默认订阅主题
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 配置连接工厂
     * @return
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs(mqttUrl);
        factory.setUserName(mqttpPrincipal);
        factory.setPassword(mqttpPassword);
        return factory;
    }
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    /**
     * Inbound(消息驱动)通道适配器,用于接受消息
     * @return
     */
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(
                mqttpClientId//clientId,每次不同的连接需要不同的clientId,不然新的clientId会挤掉旧的clientID
                ,mqttClientFactory()//连接工厂
                ,mqttpTopic);//主题-topic
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

}
